package com.geomesa.spark.SparkCore

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.CreateArray
import org.apache.spark.sql.types.{ArrayType, DataTypes, StringType, StructField, StructType}
import org.locationtech.geomesa.spark.jts.util.WKTUtils
import org.locationtech.jts.geom.GeometryFactory

import scala.collection.mutable

object test {

  def main(args: Array[String]): Unit = {
    import org.locationtech.geomesa.spark.jts._
    //spark
    val spark: SparkSession = {
      SparkSession.builder()
        .appName("test")
        .master("local[*]")
        .getOrCreate()
        //需注入spark.jts._包
        .withJTS
    }

    val schema = StructType(Array(
      StructField("crs", StringType),
      StructField("features", ArrayType(
        StructType(Array(StructField("geometry",
          StructType(Array(StructField("coordinates",
            ArrayType(DataTypes.createArrayType(ArrayType((DataTypes.DoubleType)))))
          )))))))
    ))

    val schema1 = StructType(Array(
      StructField("crs", StringType),
      StructField("features", ArrayType(
        StructType(Array(StructField("geometry",
          StructType(Array(StructField("coordinates", StringType)
          )))))))
    ))

    val schema2 = StructType(Seq(
      StructField("crs", StringType),
      StructField("features", ArrayType(
        StructType(Seq(StructField("geometry",
          StructType(Seq(StructField("coordinates", StringType)
          )))))))
    ))


    /*    root
       |-- crs: struct (nullable = true)
       |    |-- properties: struct (nullable = true)
       |    |    |-- name: string (nullable = true)
       |    |-- type: string (nullable = true)
       |-- features: array (nullable = true)
       |    |-- element: struct (containsNull = true)
       |    |    |-- geometry: struct (nullable = true)
       |    |    |    |-- coordinates: array (nullable = true)
       |    |    |    |    |-- element: array (containsNull = true)
       |    |    |    |    |    |-- element: array (containsNull = true)
       |    |    |    |    |    |    |-- element: double (containsNull = true)
       |    |    |    |-- type: string (nullable = true)
       |    |    |-- geometry_name: string (nullable = true)
       |    |    |-- id: string (nullable = true)*/

    val dataFile = this.getClass.getClassLoader.getResource("linepp.txt").getPath
    val df = spark.read
      .schema(schema2)
      .json(dataFile)
    //.show(5, false)
    //.printSchema()

    val geo = new GeometryFactory()
    import spark.implicits._
    val newDF = df.withColumn("features", $"features")
      .withColumn("geometry", $"features.geometry")
      .withColumn("coordinates", $"geometry.coordinates")
    //.show()

    /*    +--------------------+--------------------+--------------------+--------------------+
        |                 crs|            features|            geometry|         coordinates|
        +--------------------+--------------------+--------------------+--------------------+
        |{"type":"name","p...|[[[[[[1.357846020...|[[[[[1.3578460201...|[[[[1.35784602018...|
          +--------------------+--------------------+--------------------+--------------------+*/

    import spark.sql
    //println(row.getAs[mutable.WrappedArray[String]]("coordinates"))

    var line = ""
    newDF.collect().foreach(r => {
      line = r.get(2).toString
    })
    val multiLine = line.replace("WrappedArray", "MULTILINESTRING")
      .replace(",", " ")
      .replace("[[[[", "(")
      .replace("]]]]", ")")
      .replace("] [", "),(")
      .replace(")),((", "),(")
      .replace("),(", ",")
      .replace(")  (", "),(")
      .replace("],[", ",")

    println(multiLine)

    val line1 = multiLine.replace("((", "(")
      .replace("))", ")")
      .replace("MULTILINESTRING","LINESTRING")

    val expected = WKTUtils.read(multiLine)
    val expected1 = WKTUtils.read(line1)
    println(expected.getGeometryType)
    println(expected1.getGeometryType)

    val r = newDF.withColumn("multiLine", st_mLineFromText(expected.toString)).withColumn("point", st_makePoint(121.97738334, 37.47710858))
    r.withColumn("distance", st_distance($"multiLine", $"point"))
      .withColumn("lineString",st_lineFromText(expected1.toString))
      .withColumn("dis",st_distance($"lineString",$"point"))
      .show()

  }
}
